/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.constant.{ConfigPath, JobConfigOptions}
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.util.{TryWith, Utils}
import com.typesafe.config.ConfigFactory
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory

import java.sql.DriverManager

object CrawlerApplication {
  val logger = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]): Unit = {
    var jobConfig: String = ""
    var monitor: String = ""
    var flinkConf: Configuration = null
    if (!args.isEmpty) {
      val parameter = ParameterTool.fromArgs(args)
      jobConfig = parameter.get("jobConfig")
    } else {
      val flinkConfDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)
      flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
      val result = getJobConfig(flinkConf)
      jobConfig = result._1
      monitor = result._2
    }
    val config = ConfigFactory.parseString(jobConfig)
    val tableNamePrefix = config.getString(ConfigPath.TARGET_PREFIX)
    val endpoint = config.getString(ConfigPath.SOURCE_ENDPOINT)
    val ak = config.getString(ConfigPath.SOURCE_AK)
    val sk = config.getString(ConfigPath.SOURCE_SK)
    val ossTableProperties = Map(
      "fs.s3a.endpoint" -> endpoint,
      "fs.s3a.access.key" -> ak,
      "fs.s3a.secret.key" -> sk
    )
    val database = config.getString(ConfigPath.TARGET_INSTANCE)
    val env: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    val gen = Generator(config)
    val mayBeTables = gen.run()
    env
      .fromCollection(mayBeTables)
      .flatMap(MapTable2(config))
      .map(s => (s.name, List(s)))
      .keyBy(_._1)
      //rename table
      .reduce((t1, t2) => (t1._1, t1._2 ::: t2._2))
      .flatMap { t =>
        val duplicateNameTbls = t._2
        duplicateNameTbls.head :: duplicateNameTbls.tail.map(t =>
          t.copy(name = t.name + "_" + Utils.generateShortUuid())
        )
      }
      .map(s =>
        s.copy(
          name = database + "." + tableNamePrefix + s.name,
          properties = s.properties ++ ossTableProperties
        )
      )
      .addSink(SimpleHttpSink(config))
    val jobId = config.getString("jobId")
    if (flinkConf != null) {
      updateJobStatus(flinkConf)
    }
    env.execute(jobId)
  }

  def updateJobStatus(conf: Configuration): Unit = {
    val tableName = conf.getString(JobConfigOptions.JOB_INFO_TABLE_NAME)
    logger.info(s"tableName: $tableName")
    val clusterId = conf.getString(JobConfigOptions.KUBERNETES_CLUSTER_ID)
    logger.info(s"clusterId: $clusterId")
    val connectionUrl = conf.getString(JobConfigOptions.JDBC_CONNECTION_URL)
    logger.info(s"connectionUrl: $connectionUrl")
    val username = conf.getString(JobConfigOptions.JDBC_CONNECTION_USERNAME)
    val password = conf.getString(JobConfigOptions.JDBC_CONNECTION_PASSWORD)
    val sql = List(
      "update",
      Utils.surround("`", tableName),
      "SET `is_started` = 1 WHERE cluster_id =",
      Utils.surround("'", clusterId)
    ).mkString(" ")
    logger.info("execute sql: " + sql)
    Class.forName(conf.getString(JobConfigOptions.JDBC_DRIVER_NAME))
    TryWith
      .apply(DriverManager.getConnection(connectionUrl, username, password)) {
        conn =>
          val result = conn.createStatement().execute(sql)
          if (!result) {
            logger.error("Failed to update job status with cluster id : " + clusterId)
            throw new RuntimeException("Failed to update job status with cluster id : " + clusterId)
          }
      }
  }

  def getJobConfig(conf: Configuration): (String, String) = {
    val tableName = conf.getString(JobConfigOptions.JOB_INFO_TABLE_NAME)
    logger.info(s"tableName: $tableName")
    val clusterId = conf.getString(JobConfigOptions.KUBERNETES_CLUSTER_ID)
    logger.info(s"clusterId: $clusterId")
    val connectionUrl = conf.getString(JobConfigOptions.JDBC_CONNECTION_URL)
    logger.info(s"connectionUrl: $connectionUrl")
    val username = conf.getString(JobConfigOptions.JDBC_CONNECTION_USERNAME)
    val password = conf.getString(JobConfigOptions.JDBC_CONNECTION_PASSWORD)
    val jobConfigCol = conf.getString(JobConfigOptions.JOB_CONFIG_COLUMN_NAME)
    logger.info(s"column name: $jobConfigCol")

    val jobMonitorCol =
      conf.getString(JobConfigOptions.JOB_MONITOR_URL_COLUMN_NAME)
    logger.info(s"monitor name: $jobMonitorCol")
    val sql =
      s"SELECT $jobConfigCol,$jobMonitorCol FROM $tableName WHERE cluster_id = '$clusterId'"
    Class.forName(conf.getString(JobConfigOptions.JDBC_DRIVER_NAME))
    TryWith
      .apply(DriverManager.getConnection(connectionUrl, username, password)) {
        conn =>
          TryWith.apply(conn.createStatement().executeQuery(sql)) { rs =>
            if (rs.next()) {
              (rs.getString(jobConfigCol), rs.getString(jobMonitorCol))
            } else {
              throw new RuntimeException(
                "Failed to get job config and job monitor url with cluster id : " + clusterId
              )
            }
          }
      }
      .flatMap(identity)
      .get
  }
}
